package com.base.mq.consumer;

import java.io.UnsupportedEncodingException;
import java.util.List;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

public class ConsumerDemo {

	
	public static void main(String[] args) throws Exception {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer");
		
		consumer.setNamesrvAddr("192.168.189.152:9877");
		//订阅消息 * 接收这个topic的全部信息
		
		consumer.subscribe("test-topic", "add");
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			
			@Override
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				// TODO Auto-generated method stub
				for (MessageExt ext : msgs) {
					try {
						System.out.println("消息提》》》》》"+new String(ext.getBody(), "UTF-8"));
					} catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					}
				}
				System.out.println("接收到的消息》》"+ msgs);
				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
	
		consumer.start();
	}
}
